package cn.zhangfusheng.elasticsearch.template;

import cn.zhangfusheng.elasticsearch.exception.GlobalSystemException;
import cn.zhangfusheng.elasticsearch.thread.ThreadLocalDetail;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RethrottleRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
 * es 索引操作 mapping 操作
 * @author fusheng.zhang
 * @date 2021-09-10 12:16:01
 */
@SuppressWarnings("UnusedReturnValue")
interface TemplateDocumentApi extends Template, TemplateLogFormat {

    Logger log = LoggerFactory.getLogger(TemplateDocumentApi.class);

    /**
     * 保存一条数据
     * @param id              数据id
     * @param routing         routing
     * @param indexName       索引
     * @param xContentBuilder
     * @return
     */
    default boolean index(String id, String routing, String indexName, XContentBuilder xContentBuilder) {
        return index(new IndexRequest(indexName).routing(routing).id(id).source(xContentBuilder));
    }

    /**
     * 保存一条数据
     * @param id        数据id
     * @param routing   routing
     * @param indexName 索引
     * @param sourceMap
     * @return
     */
    default boolean index(String id, String routing, String indexName, Map<String, Object> sourceMap) {
        return index(new IndexRequest(indexName).routing(routing).id(id).source(sourceMap));
    }

    /**
     * 执行 indexRequest<br/>
     * 只能create
     * @param request
     * @return
     */
    default boolean index(IndexRequest request) {
        try {
            ThreadLocalDetail.getRefreshPolicy().ifPresent(request::setRefreshPolicy);
            if (log.isDebugEnabled()) log.debug("PUT {}", this.formatDsl(request));
            // 转批量处理
            boolean addSuccess = ThreadLocalDetail.addRequest(request.opType(DocWriteRequest.OpType.CREATE));
            if (addSuccess) return true;
            // getBulkProcessor().add(request);
            IndexResponse indexResponse = restHighLevelClient().index(request, ThreadLocalDetail.requestOptions());
            return Objects.equals(indexResponse.status(), RestStatus.CREATED);
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * @param index    索引 必填
     * @param routing  routing
     * @param id       id 必填
     * @param excludes 要排除的字段
     * @param includes 要查询的字段
     * @return
     */
    default GetResponse get(String index, String routing, String id, List<String> includes, List<String> excludes) {
        if (StringUtils.isBlank(index)) throw new GlobalSystemException("必须指定索引");
        if (StringUtils.isBlank(id)) return null;
        GetRequest request = new GetRequest(index, id).refresh((true));
        if (StringUtils.isNotBlank(routing)) request.routing(routing);
        boolean i = !CollectionUtils.isEmpty(includes), ei = !CollectionUtils.isEmpty(excludes);
        if (i || ei) {
            String[] includes_ = Strings.EMPTY_ARRAY, excludes_ = Strings.EMPTY_ARRAY;
            if (i) includes_ = includes.toArray(new String[0]);
            if (ei) excludes_ = excludes.toArray(new String[0]);
            request.fetchSourceContext(new FetchSourceContext(true, includes_, excludes_));
        }
        if (log.isDebugEnabled()) log.debug("GET {}", this.formatDsl(request));
        try {
            return restHighLevelClient().get(request, ThreadLocalDetail.requestOptions());
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 如果文档存在，exists API将返回true，否则返回false。
     * @param index
     * @param routing
     * @param id
     * @return
     */
    default boolean exists(String index, String routing, String id) {
        if (StringUtils.isBlank(index)) throw new GlobalSystemException("必须指定索引");
        if (StringUtils.isBlank(id)) throw new GlobalSystemException("请指定id");
        GetRequest request = new GetRequest(index, id).routing(routing);
        request.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
        if (log.isDebugEnabled()) log.debug("HEAD {}", this.formatDsl(request));
        try {
            return restHighLevelClient().exists(request, ThreadLocalDetail.requestOptions());
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 删除
     * @param index
     * @param routing
     * @param id
     * @return
     */
    default boolean delete(String index, String routing, String id) {
        if (StringUtils.isBlank(index)) throw new GlobalSystemException("必须指定索引");
        if (StringUtils.isBlank(id)) throw new GlobalSystemException("请指定id");
        DeleteRequest request = new DeleteRequest(index, id).routing(routing);
        return this.delete(request);
    }

    /**
     * 删除
     * @param request
     * @return
     */
    default boolean delete(DeleteRequest request) {
        // 刷新策略设置
        ThreadLocalDetail.getRefreshPolicy().ifPresent(request::setRefreshPolicy);
        if (log.isDebugEnabled()) log.debug("DELETE {}", this.formatDsl(request));
        // 转批量处理
        boolean addSuccess = ThreadLocalDetail.addRequest(request);
        if (addSuccess) return true;
        try {
            DeleteResponse deleteResponse = restHighLevelClient().delete(request, ThreadLocalDetail.requestOptions());
            return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 更新
     * @param index
     * @param routing
     * @param id
     * @param xContentBuilder
     * @return
     */
    default boolean update(String index, String routing, String id, XContentBuilder xContentBuilder) {
        if (StringUtils.isBlank(index)) throw new GlobalSystemException("必须指定索引");
        if (StringUtils.isBlank(id)) throw new GlobalSystemException("请指定id");
        UpdateRequest request = new UpdateRequest(index, id).doc(xContentBuilder).routing(routing);
        return this.update(request);
    }

    default boolean update(UpdateRequest request) {
        // 刷新策略控制
        ThreadLocalDetail.getRefreshPolicy().ifPresent(request::setRefreshPolicy);
        if (log.isDebugEnabled()) log.debug("PUT {}", this.formatDsl(request));
        // 转批量处理
        boolean addSuccess = ThreadLocalDetail.addRequest(request);
        if (addSuccess) return true;
        try {
            UpdateResponse updateResponse = restHighLevelClient().update(request, ThreadLocalDetail.requestOptions());
            return updateResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 批量执行写请求
     * @param request
     */
    default boolean bulkRequest(BulkRequest request) {
        ThreadLocalDetail.getRefreshPolicy().ifPresent(request::setRefreshPolicy);
        if (log.isDebugEnabled()) log.debug("POST /_bulk {}", this.formatDsl(request));
        try {
            BulkResponse bulkResponse = restHighLevelClient().bulk(request, ThreadLocalDetail.requestOptions());
            if (bulkResponse.hasFailures()) {
                throw new GlobalSystemException(bulkResponse.buildFailureMessage());
            }
            return bulkResponse.hasFailures();
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 将文档从一个或多个索引复制到目标索引中
     * TODO reindex 的作用理解不到位,描述存在问题,需要重新理解
     * @return
     */
    default void reindex(String targetIndex, String... sourceIndex) {
        if (StringUtils.isBlank(targetIndex)) throw new GlobalSystemException("请设置目标索引");
        if (StringUtils.isAnyBlank(sourceIndex)) throw new GlobalSystemException("请设置源索引");
        ReindexRequest request = new ReindexRequest();
        request.setSourceIndices(sourceIndex).setDestIndex(targetIndex).setRefresh(true);
        log.debug(request.toString());
        try {
            restHighLevelClient().reindexAsync(request, ThreadLocalDetail.requestOptions(), new ActionListener<BulkByScrollResponse>() {
                @Override
                public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
                    log.debug(bulkByScrollResponse.toString());
                }

                @Override
                public void onFailure(Exception e) {
                    log.error(e.getMessage(), e);
                }
            });
            // BulkByScrollResponse bulkByScrollResponse = restHighLevelClient().reindex(request, ThreadLocalDetail.requestOptions());
            // List<BulkItemResponse.Failure> bulkFailures = bulkByScrollResponse.getBulkFailures();
        } catch (Exception e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 根据查询条件更新
     * @param query   查询条件
     * @param script  更新内容
     * @param routing routing
     * @param index   index
     */
    default long updateByQuery(QueryBuilder query, Script script, String routing, String... index) {
        if (StringUtils.isAnyBlank(index)) throw new GlobalSystemException("请指定index");
        UpdateByQueryRequest request = new UpdateByQueryRequest(index).setQuery(query).setRouting(routing).setScript(script);
        // 获取批量处理的标识
        ThreadLocalDetail.getTransactionTaskId().ifPresent(request::setParentTask);
        // 请求结束后是否刷新
        ThreadLocalDetail.getRefreshPolicy().ifPresent(
                refreshPolicy -> request.setRefresh(!Objects.equals(refreshPolicy, WriteRequest.RefreshPolicy.NONE)));
        try {
            if (log.isDebugEnabled()) log.debug("POST {}", this.formatDsl(request));
            BulkByScrollResponse bulkByScrollResponse = restHighLevelClient().updateByQuery(request, ThreadLocalDetail.requestOptions());
            return bulkByScrollResponse.getUpdated();
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 根据查询条件删除
     * @param query   查询条件
     * @param routing routing
     * @param index   index
     */
    default long deleteByQuery(QueryBuilder query, String routing, String... index) {
        if (StringUtils.isAnyBlank(index)) throw new GlobalSystemException("请指定index");
        DeleteByQueryRequest request = new DeleteByQueryRequest(index).setQuery(query).setRouting(routing);
        log.debug(request.toString());
        // 获取批量处理的标识
        ThreadLocalDetail.getTransactionTaskId().ifPresent(request::setParentTask);
        // 请求结束后是否刷新
        ThreadLocalDetail.getRefreshPolicy().ifPresent(
                refreshPolicy -> request.setRefresh(!Objects.equals(refreshPolicy, WriteRequest.RefreshPolicy.NONE)));
        try {
            BulkByScrollResponse bulkByScrollResponse = restHighLevelClient().deleteByQuery(request, ThreadLocalDetail.requestOptions());
            return bulkByScrollResponse.getDeleted();
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 节流请求
     *  TODO 未明白作用
     */
    default void rethrottle() {
        Optional<TaskId> transactionTaskId = ThreadLocalDetail.getTransactionTaskId();
        transactionTaskId.ifPresent(taskId -> {
            RethrottleRequest request = new RethrottleRequest(taskId);
            log.debug(request.toString());
            try {
                restHighLevelClient().reindexRethrottle(request, ThreadLocalDetail.requestOptions());
                restHighLevelClient().updateByQueryRethrottle(request, ThreadLocalDetail.requestOptions());
                restHighLevelClient().deleteByQueryRethrottle(request, ThreadLocalDetail.requestOptions());
            } catch (IOException e) {
                throw new GlobalSystemException(e);
            }
        });
    }

    /**
     * 新建索引 以及 mapping
     * @param indexName 索引名称
     * @param mapping   映射文档
     * @param setting   setting配置
     * @throws IOException
     */
    default void createIndexMapping(String indexName, String mapping, String setting) {
        try {
            log.debug("create new index:[{}],mapping:{}", indexName, mapping);
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).mapping(mapping, XContentType.JSON);
            if (StringUtils.isNotBlank(setting)) createIndexRequest.settings(setting, XContentType.JSON);
            boolean acknowledged = this.restHighLevelClient().indices().create(createIndexRequest, ThreadLocalDetail.requestOptions()).isAcknowledged();
            if (!acknowledged) throw new GlobalSystemException("创建index:[{}]的mapping失败", indexName);
        } catch (Exception e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 更新或创建 mapping
     * @throws IOException
     */
    default void putMapping(String indexName, String mapping) {
        try {
            PutMappingRequest putMappingRequest = new PutMappingRequest(indexName).source(mapping, XContentType.JSON);
            boolean acknowledged = this.restHighLevelClient().indices().putMapping(putMappingRequest, ThreadLocalDetail.requestOptions()).isAcknowledged();
            log.debug("create index:{} mapping:{}, success?{}", indexName, mapping, acknowledged);
        } catch (Exception e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 判断索引是否存在
     * @param getIndexRequest
     * @return
     */
    default boolean exists(GetIndexRequest getIndexRequest) {
        try {
            return this.restHighLevelClient().indices().exists(getIndexRequest, ThreadLocalDetail.requestOptions());
        } catch (Exception e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 索引的别名是否存在
     * @param aliases
     * @return
     */
    default boolean indexExistsAlias(String... aliases) {
        try {
            return this.restHighLevelClient().indices().existsAlias(new GetAliasesRequest(aliases), ThreadLocalDetail.requestOptions());
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 为索引设置别名
     * @param indexName
     * @param aliasName
     */
    default void indexSetAlias(String indexName, String aliasName) {
        try {
            IndicesAliasesRequest.AliasActions aliasActions =
                    new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD).index(indexName).alias(aliasName);
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest().addAliasAction(aliasActions);
            AcknowledgedResponse acknowledgedResponse =
                    this.restHighLevelClient().indices().updateAliases(indicesAliasesRequest, ThreadLocalDetail.requestOptions());
            boolean acknowledged = acknowledgedResponse.isAcknowledged();
            if (!acknowledged) throw new GlobalSystemException("index:[{}]的设置alias:[{}]失败", indexName, aliasName);
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }

    /**
     * 删除索引的别名
     * @param indexName
     * @param aliasName
     */
    default void indexRemoveAlias(String indexName, String aliasName) {
        try {
            IndicesAliasesRequest.AliasActions aliasActions =
                    new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE).index(indexName).alias(aliasName);
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest().addAliasAction(aliasActions);
            AcknowledgedResponse acknowledgedResponse =
                    this.restHighLevelClient().indices().updateAliases(indicesAliasesRequest, ThreadLocalDetail.requestOptions());
            boolean acknowledged = acknowledgedResponse.isAcknowledged();
            if (!acknowledged) throw new GlobalSystemException("index:[{}]的设置alias:[{}]失败", indexName, aliasName);
        } catch (IOException e) {
            throw new GlobalSystemException(e);
        }
    }
}
